﻿using NsqSharp;
using NsqSharp.Utils.Loggers;

using System.Text;

namespace anydata.Models
{
    public class NsqConsumer
    {
        private Consumer _consumer;
        private readonly string[] _nsqLookups;
        private readonly string _topic;
        private readonly string _channel;
        private readonly ILogger _logger;
        public NsqConsumer(string[] nsqLookups, string topic,
            string channel, ILogger logger)
        {
            _logger = logger;
            _nsqLookups = nsqLookups;
            _topic = topic;
            _channel = channel;
        }

        public async Task Subscribe(HttpClient _client, Action<string> _onMessage, CancellationToken stoppingToken)
        {
            var nsqLogger = new ConsoleLogger(NsqSharp.Core.LogLevel.Error);
            var nodes = await _client.GetFromJsonAsync<NsqNodes>($"http://{_nsqLookups.First()}/nodes", stoppingToken);
            if (nodes != null && nodes.producers.Count > 0)
            {
                foreach (var node in nodes.GetNsqdTcps())
                {
                    var producer = new Producer(node, nsqLogger);
                    await producer.PublishAsync(_topic, Encoding.UTF8.GetBytes("heartbeat"));
                }
                _consumer = new Consumer(_topic, _channel, nsqLogger,
                    new NsqSharp.Config()
                    {
                        MaxInFlight = nodes.producers.Count,
                        LookupdPollInterval = TimeSpan.FromSeconds(1),
                        HeartbeatInterval = TimeSpan.FromSeconds(2)
                    });
                var handler = new MessageHandler(_logger);
                handler.OnMessgae += _onMessage;
                _consumer.AddHandler(handler, nodes.producers.Count);
                _consumer.ConnectToNsqLookupd(_nsqLookups);
                //_consumer.ConnectToNsqd("orginone.cn:4150", "orginone.cn:4250", "orginone.cn:4350");
            }
        }

        public async Task StopAsync()
        {
            if(_consumer != null)
            {
                await _consumer.StopAsync();
            }
        }
    }
}
